package com.snail.common.mq.kafka.factory;

import com.alibaba.fastjson2.JSON;
import com.snail.common.core.utils.StringUtils;
import com.snail.common.mq.annotation.SnailMQLog;
import com.snail.common.mq.constants.MqConstants;
import com.snail.common.mq.core.AbstractMqFactory;
import com.snail.common.mq.core.IMqProducerAndConsumerHandler;
import com.snail.common.mq.core.dto.MqDto;
import com.snail.common.mq.core.dto.MqResultDto;
import com.snail.common.mq.exception.MqException;
import org.springframework.kafka.annotation.KafkaHandler;

/**
 * @Description: Kafka消息工厂
 * @Author: Snail
 * @CreateDate: 2023/11/21 11:29
 * @Version: V1.0
 */
public abstract class AbstractKafkaMqFactory<T> extends AbstractMqFactory<T> {


    public AbstractKafkaMqFactory(IMqProducerAndConsumerHandler kafkaProducerAndConsumerHandler) {
        super(kafkaProducerAndConsumerHandler);
    }

    /**
     * 消息生产
     *
     * @param mqDto 消息体
     * @return 结果
     * @throws MqException 异常
     */
    @Override
    @SnailMQLog(type = MqConstants.PRODUCER, mq = MqConstants.KAFKA_TYPE)
    public MqResultDto sendMq(MqDto<T> mqDto) throws MqException {
        return mqProducerAndConsumerHandler.sendMq(mqDto);
    }

    /**
     * 消息监听
     *
     * @param value 消息体
     * @throws MqException 异常
     */
    @Override
    @KafkaHandler
    @SnailMQLog(type = MqConstants.CONSUMER, mq = MqConstants.KAFKA_TYPE)
    protected void listener(String value) throws MqException {
        if (StringUtils.isEmpty(value)) {
            throw new MqException("snail kafka listener value isEmpty ");
        }
        MqDto<T> mqDto = JSON.parseObject(value, MqDto.class);
        //分布式锁key
        String key = StringUtils.format(MqConstants.LOCK_KEY, mqDto.getTopic() + "_" + mqDto.getPriKey());
        //获取消息生产数据
        if (getProducerCount(mqDto) == 0) {
            throw new MqException(StringUtils.format("消费失败：消息{}不存在!", mqDto.getPriKey()));
        }
        try {
            //获取分布式锁
            if (lock.getLock(key)) {
                //业务消费处理
                consumeMq(mqDto);
            }
        } catch (Exception e) {
            throw new MqException(e.getMessage());
        } finally {
            lock.unLock(key);
        }
    }
}
